package com.shujia.flinkcore;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Arrays;

public class SocketWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> src = env.socketTextStream("hadoop102", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = src.flatMap((words, out) -> Arrays.asList(words.split(","))
                        .forEach(e -> out.collect(e)), Types.STRING)
                .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));
        SingleOutputStreamOperator<Tuple2<String, Integer>> resDS = wordDS.keyBy(key -> key.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .sum(1);
        resDS.print();
        env.execute();

    }
}
